/*
 * Copyright (c) 2017-2019, Lindenis Tech. Ltd.
 * All rights reserved.
 *
 * File:
 *
 * Description:
 *
 * Author:
 *      xiaoshujun@lindeni.com
 *
 * Create Date:
 *      2021/04/13
 *
 * History:
 *
 */
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#if ((defined MELIS_OS) || (defined WIN32))
#else
#include <sys/prctl.h>
#endif

#include "osal_types.h"
#include "osal_queue.h"
#include "osal_message.h"

#include "osal_log.h"

typedef struct _msg_context_t
{
    _handle_t           h_queue;

    on_msg_fn           on_msg;
    void *              user;

    int                 running;
    pthread_t           tid;
} msg_ctx_t;

typedef struct _osal_msg_t
{
    int         msg;
    int         size;
    void *      data;
} osal_msg_t;

static void * thread_message_queue(void *param)
{
    msg_ctx_t * p_ctx = (msg_ctx_t *)param;
    _handle_t h_queue = p_ctx->h_queue;
    on_msg_fn on_msg = p_ctx->on_msg;
    void * user = p_ctx->user;
    int ret = 0;

#ifndef WIN32
    prctl(PR_SET_NAME, (unsigned long)"msg", 0, 0, 0);
#endif

    while(p_ctx->running)
    {
        osal_msg_t * p_msg = NULL;
        ret = osal_dequeue_timeout(h_queue, (void**)&p_msg, 1000);
        if (ret < 0) {
            continue;
        }

        on_msg(user, p_msg->msg, p_msg->data, p_msg->size);

        if (p_msg->data) {
            free(p_msg->data);
        }
        free(p_msg);
    }

    return 0;
}

_handle_t osal_msg_create(int max, char * msg_name, void * user, on_msg_fn on_msg)
{
    int ret = 0;
    msg_ctx_t * p_ctx = (msg_ctx_t *)malloc(sizeof(msg_ctx_t));
    if (p_ctx == NULL)
    {
        return (_handle_t)NULL;
    }

    memset(p_ctx, 0, sizeof(msg_ctx_t));

    p_ctx->h_queue = osal_queue_create(max, msg_name);
    if (p_ctx->h_queue == 0)
    {
        goto _failed_;
    }

    p_ctx->on_msg = on_msg;
    p_ctx->user = user;

    if (on_msg)
    {
        p_ctx->running = 1;
        ret = pthread_create(&p_ctx->tid, NULL, thread_message_queue, (void *)p_ctx);
        if (ret) {
            goto _failed_;
        }
    }

    return (_handle_t)p_ctx;

_failed_:
    if (p_ctx->h_queue) {
        osal_queue_destroy(p_ctx->h_queue);
    }
    free(p_ctx);
    return (_handle_t)NULL;
}

void osal_msg_destroy(_handle_t h_msg)
{
    msg_ctx_t * p_ctx = (msg_ctx_t *)h_msg;
    if (p_ctx)
    {
        p_ctx->running = 0;
        osal_queue_abort(p_ctx->h_queue);
        if (p_ctx->tid)
        {
            pthread_join(p_ctx->tid, 0);
            p_ctx->tid = 0;
        }

        if (p_ctx->h_queue)
        {
            osal_queue_destroy(p_ctx->h_queue);
            p_ctx->h_queue = 0;
        }
    }
}

int osal_msg_post(_handle_t h_msg, int msg, void * data, int size)
{
    msg_ctx_t * p_ctx = (msg_ctx_t *)h_msg;
    int ret = 0;

    osal_msg_t * p_msg = (osal_msg_t *)malloc(sizeof(osal_msg_t));
    if (p_msg == NULL) {
        return -1;
    }
    memset(p_msg, 0, sizeof(osal_msg_t));

    p_msg->msg = msg;
    if (size > 0 && data != NULL)
    {
        p_msg->data = (void*)malloc(size);
        if (p_msg->data == NULL) {
            goto _failed_;
        }
        memcpy(p_msg->data, data, size);
    }
    p_msg->size = size;

    ret = osal_enqueue(p_ctx->h_queue, p_msg, QUEUE_BLOCK);
    if (ret < 0) {
        goto _failed_;
    }

    return 0;

_failed_:
    if (p_msg->data) {
        free(p_msg->data);
    }
    free(p_msg);
    return -1;
}

int osal_msg_send(_handle_t h_msg, int msg, void * data, int size)
{
    msg_ctx_t * p_ctx = (msg_ctx_t *)h_msg;
    return p_ctx->on_msg(p_ctx->user, msg, data, size);
}

int osal_msg_get(_handle_t h_msg, int * msg, void * data, int size, int timeout_ms)
{
    msg_ctx_t * p_ctx = (msg_ctx_t *)h_msg;
    if (p_ctx->on_msg)
    {
        loge("you should listen msg by callback function!");
        return -1;
    }
    return osal_dequeue_timeout(p_ctx->h_queue, (void**)msg, timeout_ms);
}

int osal_msg_get_nb(_handle_t h_msg)
{
    msg_ctx_t * p_ctx = (msg_ctx_t *)h_msg;
    return osal_queue_get_nb(p_ctx->h_queue);
}

void osal_msg_flush(_handle_t h_msg, int free_msg)
{
    msg_ctx_t * p_ctx = (msg_ctx_t *)h_msg;
    _handle_t h_queue = p_ctx->h_queue;
    int ret = 0;

    while(1)
    {
        osal_msg_t * p_msg = NULL;
        ret = osal_dequeue_timeout(h_queue, (void**)&p_msg, 0);
        if (ret < 0) {
            return;
        }

        if (p_msg->data) {
            free(p_msg->data);
        }
        free(p_msg);
    }
}

